1 module hunt.cache.rocksdb; 2 3 import hunt.cache.cache; 4 import hunt.cache.nullable; 5 import hunt.cache.store; 6 7 8 import std.string; 9 import core.time; 10 import core.stdc.time; 11 import core.stdc.string; 12 import core.thread; 13 import std.file; 14 15 version(SUPPORT_ROCKSDB){ 16 17 import rocksdb; 18 19 class RocksdbCache 20 { 21 this(string dir){ 22 23 create(dir); 24 } 25 26 ~this(){ 27 _rocksdb.close(); 28 } 29 30 Nullable!V get(V)(string key) 31 { 32 synchronized(this){ 33 auto data = _rocksdb.get(cast(ubyte[])key); 34 35 return get_inter!V(data); 36 37 } 38 } 39 40 Nullable!V[string] getall(V)(string[] key) 41 { 42 synchronized(this){ 43 Nullable!V[string] mapv; 44 ubyte[][] datas = _rocksdb.multiGet(cast(ubyte[][])key); 45 foreach(i , d ; datas) 46 { 47 mapv[key[i]] = get_inter!V(d); 48 } 49 return mapv; 50 } 51 } 52 53 bool containsKey(string key) 54 { 55 synchronized(this){ 56 auto data = _rocksdb.get(cast(ubyte[])key); 57 if(data == null ) 58 return false; 59 if(check_is_expired(data)) 60 { 61 _rocksdb.remove(cast(ubyte[])key); 62 return false; 63 } 64 return true; 65 } 66 } 67 68 69 void put(V)(string key , V v , uint expired = 0) 70 { 71 synchronized(this){ 72 _rocksdb.put(cast(ubyte[])key , 73 generator_expired(expired) ~ cast(ubyte[]) SerializeToByte!V(v)); 74 } 75 } 76 77 // rocksdb no putifaabsent , so this function not atomic. 78 bool putifAbsent(V)(string key , V v) 79 { 80 synchronized(this){ 81 auto data = _rocksdb.get(cast(ubyte[])key); 82 if(data == null || check_is_expired(data)) 83 { 84 put(key , v); 85 return true; 86 } 87 return false; 88 } 89 } 90 91 void putAll(V)( V[string] maps , uint expired) 92 { 93 synchronized(this){ 94 string[] datas; 95 if(maps.length == 0) 96 return; 97 auto expired_data = generator_expired(expired); 98 _rocksdb.withBatch((batch){ 99 foreach(k , v ; maps) 100 batch.put(cast(ubyte[])k , 101 expired_data ~ cast(ubyte[])SerializeToByte(v)); 102 }); 103 } 104 } 105 106 107 bool remove(string key) 108 { 109 synchronized(this){ 110 // rocksdb's remove api not return the value. 111 auto data = _rocksdb.get(cast(ubyte[])key); 112 if(data == null ) 113 { 114 return false; 115 } 116 117 if(check_is_expired(data)) 118 { 119 _rocksdb.remove(cast(ubyte[])key); 120 return false; 121 } 122 123 _rocksdb.remove(cast(ubyte[])key); 124 return true; 125 } 126 127 } 128 129 void removeAll(string[] keys) 130 { 131 synchronized(this){ 132 foreach(k ; keys){ 133 _rocksdb.remove(cast(ubyte[])k); 134 } 135 } 136 } 137 138 void clear() 139 { 140 _rocksdb.close(); 141 std.file.rmdirRecurse(_dir); 142 create(_dir); 143 144 } 145 146 147 protected: 148 149 150 void create(string dir) 151 { 152 auto opts = new DBOptions; 153 opts.createIfMissing = true; 154 opts.errorIfExists = false; 155 156 _rocksdb = new Database(opts , dir); 157 _dir = dir; 158 } 159 160 161 162 Nullable!V get_inter(V)(ubyte[] data) 163 { 164 if(data == null) 165 return Nullable!V.init; 166 167 if(check_is_expired(data)) 168 { 169 _rocksdb.remove(cast(ubyte[])data); 170 return Nullable!V.init; 171 } 172 173 return DeserializeToObject!V(cast(byte[])data[8 .. $]); 174 } 175 176 177 ubyte[] generator_expired(uint expired) 178 { 179 byte[8] byExpired; 180 if(expired == 0) 181 { 182 return cast(ubyte[])byExpired.idup; 183 } 184 else 185 { 186 ulong stamp = time(null) + expired; 187 memcpy(byExpired.ptr ,cast(void *)&stamp , byExpired.sizeof); 188 } 189 return cast(ubyte[])byExpired.idup; 190 } 191 192 ulong get_expired(ubyte[] data) 193 { 194 ulong stamp; 195 memcpy(&stamp , data.ptr , 8); 196 return stamp; 197 } 198 199 bool check_is_expired(ubyte[] data) 200 { 201 ulong stamp = time(null); 202 ulong expired = get_expired(data); 203 if( expired > 0 && expired < stamp ){ 204 return true; 205 } 206 return false; 207 } 208 209 210 Database _rocksdb; 211 string _dir; 212 213 } 214 215 216 }